#ifndef __M_MSG_H__
#define __M_MSG_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <list>

namespace mymq
{
    #define DATAFILE_SUBFIX ".mqd"
    #define TMPFILE_SUBFIX ".mqd.tmp"

    using MessagePtr = std::shared_ptr<mymq::Message>;
    class MessageMapper
    {
        public:
            MessageMapper(std::string& basedir,const std::string& qname):
            _qname(qname)
            {
                if(basedir.back() != '/') basedir.push_back('/');
                _datafile = basedir + qname + DATAFILE_SUBFIX;
                _tmpfile = basedir + qname + TMPFILE_SUBFIX;
                if(FileHelper(basedir).exists() == false)
                    assert(FileHelper::createDirectory(basedir));
                createMsgFile();
            }

            bool createMsgFile()
            {
                if(FileHelper(_datafile).exists() == true) return true;
                bool ret = FileHelper::createFile(_datafile);
                if(ret == false)
                {
                    DLOG("创建队列文件 %s 失败",_datafile.c_str());
                    return false;
                }
                return true;
            }

            void removeMsgFile()
            {
                FileHelper::removeFile(_datafile);
                FileHelper::removeFile(_tmpfile);
            }

            bool insert(MessagePtr& msg)
            {
                return insert(_datafile,msg);
            }

            bool remove(const MessagePtr& msg)
            {
                // 1.将msg中的有效标志位修改为'0'
                msg->mutable_payload()->set_valid("0");
                // 2.对msg进行序列化
                std::string body = msg->payload().SerializeAsString();
                if(body.size() != msg->length())
                {
                    DLOG("不能修改文件中的数据信息,新生成的数据与原数据长度不一致!");
                    return false;
                }
                // 3.将序列化后的消息，写入到数据文件中指定位置（覆盖原有数据）
                FileHelper helper(_datafile);
                bool ret = helper.write(body.c_str(),msg->offset(),body.size());
                if(ret == false)
                {
                    DLOG("向队列数据文件写入数据失败!");
                    return false;
                }
                //DLOG("确认消息后，删除持久化消息成功：%s ",msg->payload().body().c_str());
                return true;
            }

            std::list<MessagePtr> gc()
            {   
                bool ret;
                std::list<MessagePtr> result;
                // 1.加载出文件中所有的有效数据;存储格式： 4字节长度|数据|4字节长度|数据
                ret = load(result);
                if (ret == false) 
                {
                    DLOG("加载有效数据失败!");
                    return result;
                }
                //DLOG("垃圾回收，得到有效消息数量：%ld",result.size());
                // 2.将有效数据进行序列化存储到临时文件中
                FileHelper::createFile(_tmpfile);
                for(auto& msg : result)
                {
                    DLOG("向临时文件写入数据: %s", msg->payload().body().c_str());
                    ret = insert(_tmpfile,msg);
                    if (ret == false) 
                    {
                        DLOG("向临时文件写入消息数据失败!");
                        return result;
                    }
                }
                //DLOG("垃圾回收后临时文件大小:%ld",FileHelper(_tmpfile).size());
                // 3.删除源文件
                ret = FileHelper::removeFile(_datafile);
                if (ret == false) 
                {
                    DLOG("删除源文件失败!");
                    return result;
                }
                // 4.修改临时文件名，改为原文件名
                ret = FileHelper(_tmpfile).rename(_datafile);
                if (ret == false) 
                {
                    DLOG("修改临时文件名称失败!");
                    return result;
                }
                // 5.返回新的有效数据
                return result;
            }
        private:
            bool load(std::list<MessagePtr> &result)
            {
                // 加载出文件中所有的有效数据;存储格式： 4字节长度|数据|4字节长度|数据
                FileHelper data_file_helper(_datafile);
                size_t offset = 0,msg_size;
                size_t fsize = data_file_helper.size();
                bool ret;
                while(offset < fsize)
                {
                    ret = data_file_helper.read((char*)&msg_size,offset,sizeof(size_t));
                    if (ret == false) 
                    {
                        DLOG("读取消息长度失败!\n");
                        return false;
                    }
                    offset += sizeof(size_t);
                    std::string msg_body(msg_size,'\0');
                    ret = data_file_helper.read(&msg_body[0],offset,msg_size);
                    if (ret == false) 
                    {
                        DLOG("读取消息数据失败!\n");
                        return false;
                    }
                    offset += msg_size;
                    MessagePtr msgp = std::make_shared<Message>();
                    msgp->mutable_payload()->ParseFromString(msg_body);
                    if(msgp->payload().valid() == "0")
                    {
                        DLOG("加载到无效消息：%s !\n",msgp->payload().body().c_str());
                        continue;
                    }
                    // 保存有效消息
                    result.push_back(msgp);
                }
                return true;
            }

            bool insert(const std::string& filename, MessagePtr& msg)
            {
                // 新增的数据添加到文件末尾
                // 1.进行消息的序列化，获取到格式化后的信息
                std::string body = msg->payload().SerializeAsString();
                // 2.获取文件长度
                FileHelper helper(filename);
                size_t fsize = helper.size();
                size_t msg_size = body.size();
                // 写入文件前要先写入4字节的数据长度，这是我们规定的存储格式
                bool ret = helper.write((char*)&msg_size,fsize,sizeof(size_t));
                if(ret == false)
                {
                    DLOG("向队列数据文件写入数据长度失败!");
                    return false;
                }
                // 3.将数据写入文件的指定位置
                ret = helper.write(body.c_str(),fsize + sizeof(size_t),body.size());
                if(ret == false)
                {
                    DLOG("向队列数据文件写入数据失败!");
                    return false;
                }
                // 4.更新msg中的实际存储信息
                msg->set_offset(fsize + sizeof(size_t));
                msg->set_length(body.size());
                return true;
            }

        private:
            std::string _qname;
            std::string _datafile;
            std::string _tmpfile;
    };

    class QueueMessage
    {
        public:
            using ptr = std::shared_ptr<QueueMessage>;
            QueueMessage(std::string& basedir,const std::string& qname):
            _mapper(basedir,qname),_qname(qname),_valid_count(0),_total_count(0)
            {}

            bool recovery()
            {
                // 恢复历史消息
                std::unique_lock<std::mutex> lock(_mutex);
                _msgs = _mapper.gc();
                for(auto& msg:_msgs)
                {
                    _durable_msgs.insert(std::make_pair(msg->payload().properties().id(),msg));    
                }
                _valid_count = _total_count = _msgs.size();
                return true;
            }

            bool insert(const BasicProperties* bp,const std::string& body, bool queue_is_durable)
            {
                // 1.构造消息对象
                MessagePtr msg = std::make_shared<Message>();
                msg->mutable_payload()->set_body(body);
                if(bp != nullptr)
                {
                    DeliveryMode mode = queue_is_durable?(bp->delivery_mode()):(DeliveryMode::UNDURABLE);
                    msg->mutable_payload()->mutable_properties()->set_id(bp->id());
                    msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
                    msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());
                }
                else
                {
                    DeliveryMode mode = queue_is_durable?(DeliveryMode::DURABLE) : (DeliveryMode::UNDURABLE);
                    msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
                    msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
                    msg->mutable_payload()->mutable_properties()->set_routing_key("");
                }

                std::unique_lock<std::mutex> lock(_mutex);
                // 2.判断消息是否需要持久化
                if(msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
                {
                    msg->mutable_payload()->set_valid("1");// 持久化存储中表示数据有效
                    // 3.进行持久化存储
                    bool ret = _mapper.insert(msg);
                    if(ret == false)
                    {
                        DLOG("持久化存储消息失败: %s !",body.c_str());
                        return false;
                    }
                    _valid_count += 1;
                    _total_count += 1;
                    _durable_msgs.insert(std::make_pair(msg->payload().properties().id(),msg));
                }
                // 4.内存的管理
                _msgs.push_back(msg);
                return true;
            }

            MessagePtr front()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                if(_msgs.size() == 0)
                {
                    return MessagePtr();
                }
                // 从msgs中取出一条队首消息，然后删除，添加到待确认消息hash中
                MessagePtr msg = _msgs.front();
                _msgs.pop_front();
                _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(),msg));
                return msg;
            }

            bool remove(const std::string& msg_id)// 每次删除消息后判断是否需要消息回收
            {
                std::unique_lock<std::mutex> lock(_mutex);
                // 1.从待确认队列中查找消息
                auto it = _waitack_msgs.find(msg_id);
                if(it == _waitack_msgs.end())
                {
                    DLOG("没有找到要删除的消息: %s ",msg_id.c_str());
                    return true;
                }
                // 2.根据消息的持久化模式，决定是否删除持久化信息
                if(it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
                {
                    // 3.删除持久化信息
                    _mapper.remove(it->second);
                    _durable_msgs.erase(msg_id);
                    _valid_count -= 1;
                    gc(); // 内部判断是否需要垃圾回收
                }
                // 4.删除内存中的信息
                _waitack_msgs.erase(msg_id);
                //DLOG("确认消息后，删除内存中消息成功：%s ",it->second->payload().body().c_str());
                return true;
            }

            size_t push_count()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                return _msgs.size();
            }

            size_t total_count()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                return _total_count;
            }

            size_t durable_count()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                return _durable_msgs.size();
            }

            size_t waitack_count()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                return _waitack_msgs.size();
            }

            void clear()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _mapper.removeMsgFile();
                _msgs.clear();
                _durable_msgs.clear();
                _waitack_msgs.clear();
                _valid_count = 0;
                _total_count = 0;
            }

        private:
            bool GCCheck()
            {
                // 持久化的消息总量大于2000 且 其中有效比例低于50%则需要清理
                if(_total_count > 2000 && (_valid_count * 10 / _total_count) < 5)
                {
                    return true;
                }
                return false;
            }

            void gc()
            {
                // 1.进行垃圾回收，获取回收后有效消息的链表
                if(GCCheck() == false) return;
                std::list<MessagePtr> msgs = _mapper.gc();
                for(auto& msg:msgs)
                {
                    auto it = _durable_msgs.find(msg->payload().properties().id());
                    if(it == _durable_msgs.end())
                    {
                        DLOG("垃圾回收后，有一条持久化消息在内存中没有进行管理!");
                        _msgs.push_back(msg);// 处理方法：添加到链表中
                        _durable_msgs.insert(std::make_pair(msg->payload().properties().id(),msg));
                        continue;
                    }
                    // 2.更新每一条消息的实际存储位置
                    it->second->set_offset(msg->offset());
                    it->second->set_length(msg->length());
                }
                // 3.更新当前的有效消息数量 和 总的持久化消息数量
                _valid_count = _total_count = msgs.size();
            }

        private:
            std::mutex _mutex;
            std::string _qname;
            size_t _valid_count;
            size_t _total_count;
            MessageMapper _mapper;
            std::list<MessagePtr> _msgs;// 待推送消息链表
            std::unordered_map<std::string,MessagePtr> _durable_msgs;// 持久化消息hash
            std::unordered_map<std::string,MessagePtr> _waitack_msgs;// 待确认消息hash
    };

    class MessageManager
    {
        public:
            using ptr = std::shared_ptr<MessageManager>;
            MessageManager(const std::string& basedir):
            _basedir(basedir)
            {}

            void clear()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                for(auto& qmsg:_queue_msgs)
                {
                    qmsg.second->clear();
                }
            }

            void InitQueueMessage(const std::string& qname)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it != _queue_msgs.end())
                    {
                        return;
                    }
                    qmp = std::make_shared<QueueMessage>(_basedir,qname);
                    _queue_msgs.insert(std::make_pair(qname,qmp));
                }
                qmp->recovery();// 该动作是很慢的，最好不要放在锁里面
            }

            void DestroyQueueMessage(const std::string& qname)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        return;
                    }
                    qmp = it->second;
                    _queue_msgs.erase(it);
                }
                qmp->clear();// 不需要这个锁来保护
            }

            bool insert(const std::string& qname,BasicProperties* bp,const std::string& body,bool queue_is_durable)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        DLOG("向队列%s新增消息失败:没有找到消息管理句柄!",qname.c_str());
                        return false;
                    }
                    qmp = it->second;
                }
                return qmp->insert(bp,body,queue_is_durable);
            }

            MessagePtr front(const std::string& qname)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        DLOG("获取队列%s队首消息失败:没有找到消息管理句柄!",qname.c_str());
                        return MessagePtr();
                    }
                    qmp = it->second;
                }
                return qmp->front();
            }

            void ack(const std::string& qname,const std::string& msg_id)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        DLOG("确认队列%s消息%s失败:没有找到消息管理句柄!",qname.c_str(),msg_id.c_str());
                        return;
                    }
                    qmp = it->second;
                }
                qmp->remove(msg_id);
            }

            size_t push_count(const std::string& qname)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        DLOG("获取队列%s待推送消息数量失败:没有找到消息管理句柄!",qname.c_str());
                        return 0;
                    }
                    qmp = it->second;
                }
                return qmp->push_count();
            }

            size_t total_count(const std::string& qname)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        DLOG("获取队列%s总持久化消息数量失败:没有找到消息管理句柄!",qname.c_str());
                        return 0;
                    }
                    qmp = it->second;
                }
                return qmp->total_count();
            }

            size_t durable_count(const std::string& qname)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        DLOG("获取队列%s有效持久化消息数量失败:没有找到消息管理句柄!",qname.c_str());
                        return 0;
                    }
                    qmp = it->second;
                }
                return qmp->durable_count();
            }

            size_t waitack_count(const std::string& qname)
            {
                QueueMessage::ptr qmp;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _queue_msgs.find(qname);
                    if(it == _queue_msgs.end())
                    {
                        DLOG("获取队列%s待确认消息数量失败:没有找到消息管理句柄!",qname.c_str());
                        return 0;
                    }
                    qmp = it->second;
                }
                return qmp->waitack_count();
            }

        private:
            std::mutex _mutex;
            std::string _basedir;
            std::unordered_map<std::string,QueueMessage::ptr> _queue_msgs;

    };
}

#endif